# 01 Identifying individuals, variables and categorical variables in a data set

In [1]:
%%html
<iframe width="700" height="400" src="https://www.youtube.com/embed/EqeVXI4WNHM/" frameborder="0" allowfullscreen></iframe>

In [1]:
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import plotly.graph_objects as go
import seaborn as sns
from sklearn import preprocessing

In [2]:
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-10_2.12:3.1.3,org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.3 pyspark-shell'

import findspark

findspark.init()
from pyspark.context import SparkContext
from pyspark.ml.feature import OneHotEncoder, StringIndexer
from pyspark.sql import functions as F
from pyspark.sql.types import *
from pyspark.sql.session import SparkSession

spark = SparkSession.builder.appName("statistics").master("local").getOrCreate()

In [8]:
import json
import uuid
from confluent_kafka import Producer
from ksql import KSQLAPI
bootstrap_servers='[::1]:9092'
topic=f'stats0101{str(uuid.uuid4())[:7]}'
msg_count=5
client = KSQLAPI(url='http://localhost:8088')

[khanacademy](https://www.khanacademy.org/math/ap-statistics/analyzing-categorical-ap/analyzing-one-categorical-variable/v/identifying-individuals-variables-and-categorical-variables-in-a-data-set?modal=1)

![Identifying individuals, variables and categorical variables in a data set fig 1](./imgs/01-01-01.png)![Identifying individuals, variables and categorical variables in a data set fig 2](./imgs/01-01-02.png)

## data

### dataset

In [27]:
dataset = {
    "Drink": [
        "Brewed coffee",
        "Caffe latte",
        "Caffe mocha",
        "Cappuccino",
        "Iced brewed coffee",
        "Chai latte",
    ],
    "Type": ["Hot", "Hot", "Hot", "Hot", "Cold", "Hot"],
    "Calories": [4, 100, 170, 60, 60, 120],
    "Sugars (g)": [0, 14, 27, 8, 15, 25],
    "Caffein (mg)": [260, 75, 95, 75, 120, 60],
}

### Pandas Dataframe

In [28]:
df = pd.DataFrame(dataset).set_index("Drink")
df

Unnamed: 0_level_0,Type,Calories,Sugars (g),Caffein (mg)
Drink,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
Brewed coffee,Hot,4,0,260
Caffe latte,Hot,100,14,75
Caffe mocha,Hot,170,27,95
Cappuccino,Hot,60,8,75
Iced brewed coffee,Cold,60,15,120
Chai latte,Hot,120,25,60


### Spark Dataframe

In [29]:
sdf = spark.createDataFrame(zip(*dataset.values()), schema=list(dataset.keys()))
sdf.show()

+------------------+----+--------+----------+------------+
|             Drink|Type|Calories|Sugars (g)|Caffein (mg)|
+------------------+----+--------+----------+------------+
|     Brewed coffee| Hot|       4|         0|         260|
|       Caffe latte| Hot|     100|        14|          75|
|       Caffe mocha| Hot|     170|        27|          95|
|        Cappuccino| Hot|      60|         8|          75|
|Iced brewed coffee|Cold|      60|        15|         120|
|        Chai latte| Hot|     120|        25|          60|
+------------------+----+--------+----------+------------+



### Kafka producer

In [30]:
p = Producer({'bootstrap.servers': bootstrap_servers})
for i in zip(*dataset.values()):
    x = dict(zip(dataset.keys(), i))
    record_key = str(uuid.uuid4())
    record_value = json.dumps(x)

    p.produce(topic, value=record_value)
    p.poll(0)
    
p.flush()    

0

### KSQL stream

In [31]:
client.create_stream(table_name=topic,
                     columns_type=['`Drink` string','`Type` string', '`Calories` bigint','`Sugars (g)` bigint', '`Caffein (mg)` bigint'],
                     topic=topic,
                     value_format='JSON'
                    )
res = client.query(f"select * from {topic}")
while True:
    try:
        print(next(res))
    except RuntimeError:
        print('')
        break

[{"header":{"queryId":"transient_STATS0101E908401_8697743092071727740","schema":"`Drink` STRING, `Type` STRING, `Calories` BIGINT, `Sugars (g)` BIGINT, `Caffein (mg)` BIGINT"}},

{"row":{"columns":["Brewed coffee","Hot",4,0,260]}},

{"row":{"columns":["Caffe latte","Hot",100,14,75]}},

{"row":{"columns":["Caffe mocha","Hot",170,27,95]}},

{"row":{"columns":["Cappuccino","Hot",60,8,75]}},

{"row":{"columns":["Iced brewed coffee","Cold",60,15,120]}},

{"row":{"columns":["Chai latte","Hot",120,25,60]}},

]




### Spark stream

#### read stream

In [32]:
df_raw = spark \
  .readStream \
  .format('kafka') \
  .option('kafka.bootstrap.servers', bootstrap_servers) \
  .option("startingOffsets", "earliest") \
  .option('subscribe', topic) \
  .load()
df_raw.printSchema()

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



#### write stream

In [66]:
df_json = df_raw.selectExpr('CAST(value AS STRING) as json')
df_json.select(F.from_json(df_json.json, sdf.schema).alias('stream_data')) \
  .writeStream \
  .queryName(topic) \
  .trigger(once=True) \
  .format("memory") \
  .start() \
  .awaitTermination()

In [68]:
spark.sql(f"select * from {topic}").show(truncate=False)

+---------------------------------------+
|stream_data                            |
+---------------------------------------+
|{Brewed coffee, Hot, 4, 0, 260}        |
|{Caffe latte, Hot, 100, 14, 75}        |
|{Caffe mocha, Hot, 170, 27, 95}        |
|{Cappuccino, Hot, 60, 8, 75}           |
|{Iced brewed coffee, Cold, 60, 15, 120}|
|{Chai latte, Hot, 120, 25, 60}         |
+---------------------------------------+



## Find and Replace

### Panads

In [11]:
df_fr = df.copy()
df_fr.replace({"Type": {"Hot": 1, "Cold": 0}}, inplace=True)
df_fr

Unnamed: 0_level_0,Type,Calories,Sugars (g),Caffein (mg)
Drink,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
Brewed coffee,1,4,0,260
Caffe latte,1,100,14,75
Caffe mocha,1,170,27,95
Cappuccino,1,60,8,75
Iced brewed coffee,0,60,15,120
Chai latte,1,120,25,60


### Spark

In [25]:
sdf_fr = sdf.withColumn(
    "Type", F.when(F.col("Type") == "Hot", 1).when(F.col("Type") == "Cold", 0)
)
sdf_fr.show()

+------------------+----+--------+----------+------------+
|             Drink|Type|Calories|Sugars (g)|Caffein (mg)|
+------------------+----+--------+----------+------------+
|     Brewed coffee|   1|       4|         0|         260|
|       Caffe latte|   1|     100|        14|          75|
|       Caffe mocha|   1|     170|        27|          95|
|        Cappuccino|   1|      60|         8|          75|
|Iced brewed coffee|   0|      60|        15|         120|
|        Chai latte|   1|     120|        25|          60|
+------------------+----+--------+----------+------------+



## Label Encoding

### Pandas

In [44]:
df_lc = df.copy()
df_lc["Type_Hot"] = df_lc["Type"].astype("category").cat.codes
df_lc

Unnamed: 0_level_0,Type,Calories,Sugars (g),Caffein (mg),Type_Hot
Drink,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1
Brewed coffee,Hot,4,0,260,1
Caffe latte,Hot,100,14,75,1
Caffe mocha,Hot,170,27,95,1
Cappuccino,Hot,60,8,75,1
Iced brewed coffee,Cold,60,15,120,0
Chai latte,Hot,120,25,60,1


### Sklearn

In [80]:
df_lc = df.copy()
df_lc["Type_Hot"] = preprocessing.LabelEncoder().fit_transform(df["Type"])
df_lc

Unnamed: 0_level_0,Type,Calories,Sugars (g),Caffein (mg),Type_Hot
Drink,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1
Brewed coffee,Hot,4,0,260,1
Caffe latte,Hot,100,14,75,1
Caffe mocha,Hot,170,27,95,1
Cappuccino,Hot,60,8,75,1
Iced brewed coffee,Cold,60,15,120,0
Chai latte,Hot,120,25,60,1


### Spark

In [73]:
sdf_lc = StringIndexer(inputCol="Type", outputCol="Type_Cold").fit(sdf).transform(sdf)
sdf_lc.show()

+------------------+----+--------+----------+------------+---------+
|             Drink|Type|Calories|Sugars (g)|Caffein (mg)|Type_Cold|
+------------------+----+--------+----------+------------+---------+
|     Brewed coffee| Hot|       4|         0|         260|      0.0|
|       Caffe latte| Hot|     100|        14|          75|      0.0|
|       Caffe mocha| Hot|     170|        27|          95|      0.0|
|        Cappuccino| Hot|      60|         8|          75|      0.0|
|Iced brewed coffee|Cold|      60|        15|         120|      1.0|
|        Chai latte| Hot|     120|        25|          60|      0.0|
+------------------+----+--------+----------+------------+---------+



## One-Hot Encoding

### Pandas

In [46]:
df_ohc = pd.get_dummies(df, columns=["Type"], prefix="Type")
df_ohc

Unnamed: 0_level_0,Calories,Sugars (g),Caffein (mg),Type_Cold,Type_Hot
Drink,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1
Brewed coffee,4,0,260,0,1
Caffe latte,100,14,75,0,1
Caffe mocha,170,27,95,0,1
Cappuccino,60,8,75,0,1
Iced brewed coffee,60,15,120,1,0
Chai latte,120,25,60,0,1


### Sklearn

In [82]:
x = (
    preprocessing.OneHotEncoder()
    .fit_transform(df["Type"].values.reshape(-1, 1))
    .toarray()
)
df_ohc = pd.concat(
    [
        df.drop(columns="Type").reset_index(),
        pd.DataFrame(x, columns=["Type_Cold", "Type_Hot"]),
    ],
    axis=1,
)
df_ohc.set_index("Drink")

Unnamed: 0_level_0,Calories,Sugars (g),Caffein (mg),Type_Cold,Type_Hot
Drink,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1
Brewed coffee,4,0,260,0.0,1.0
Caffe latte,100,14,75,0.0,1.0
Caffe mocha,170,27,95,0.0,1.0
Cappuccino,60,8,75,0.0,1.0
Iced brewed coffee,60,15,120,1.0,0.0
Chai latte,120,25,60,0.0,1.0


### Spark

In [75]:
sdf_ohc = (
    OneHotEncoder(inputCol="Type_Cold", outputCol="Type_Vec")
    .fit(sdf_lc)
    .transform(sdf_lc)
)
sdf_ohc.show()

+------------------+----+--------+----------+------------+---------+-------------+
|             Drink|Type|Calories|Sugars (g)|Caffein (mg)|Type_Cold|     Type_Vec|
+------------------+----+--------+----------+------------+---------+-------------+
|     Brewed coffee| Hot|       4|         0|         260|      0.0|(1,[0],[1.0])|
|       Caffe latte| Hot|     100|        14|          75|      0.0|(1,[0],[1.0])|
|       Caffe mocha| Hot|     170|        27|          95|      0.0|(1,[0],[1.0])|
|        Cappuccino| Hot|      60|         8|          75|      0.0|(1,[0],[1.0])|
|Iced brewed coffee|Cold|      60|        15|         120|      1.0|    (1,[],[])|
|        Chai latte| Hot|     120|        25|          60|      0.0|(1,[0],[1.0])|
+------------------+----+--------+----------+------------+---------+-------------+

